package com.atguigu.gmall.realtime.app.dws;

import com.atguigu.gmall.realtime.bean.VisitorStats;
import com.atguigu.gmall.realtime.utils.ClickHouseUtil;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


/*
        String clickhousetable ="create table visitor_stats_czg (\n" +
                " stt String,\n" +
                " edt String,\n" +
                " vc String,\n" +
                " ch String ,\n" +
                " ar String ,\n" +
                " is_new String ,\n" +
                " uv_ct bigint,\n" +
                " pv_ct bigint,\n" +
                " sv_ct bigint,\n" +
                " uj_ct bigint,\n" +
                " dur_sum bigint,\n" +
                " ts bigint \n" +
                " ) \n" +
                " WITH (\n" +
                "          'connector' = 'clickhouse',\n" +
                "          'url' = 'clickhouse://hadoop102:8123',\n" +
                "          'table-name' = 'd_sink_table',\n" +
                "          'sink.batch-size' = '1000',\n" +
                "          'sink.partition-strategy' = 'hash',\n" +
                "          'sink.partition-key' = 'name')";
        tableEnv.executeSql(clickhousetable);


 */
public class VisitorStatsAppNew {

    public static void main(String[] args) throws Exception {
        //TODO 1. 基本环境准备
        //1.1 创建Flink流式处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        /*
        //1.3 检查点CK相关设置
        env.enableCheckpointing(5000, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        StateBackend fsStateBackend = new FsStateBackend(
                "hdfs://hadoop202:8020/gmall/flink/checkpoint/ProductStatsApp");
        env.setStateBackend(fsStateBackend);
        System.setProperty("HADOOP_USER_NAME","atguigu");
        */
        //1.4 创建Table环境
        EnvironmentSettings setting = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, setting);
        //TODO 2.把数据源定义为动态表
        //三个kafaka的输入流结构都是一样 ，只是从不同的topic 过来后出来 。结构都是一样
        String strPageLog ="CREATE TABLE dwd_page_log (\n" +
                "    common ROW<ar STRING,uid STRING,os STRING,ch STRING,is_new STRING,md STRING, mid STRING, vc STRING,ba STRING>,\n" +
                "    page  ROW<page_id STRING , during_time BIGINT,last_page_id STRING>,\n" +
                "    ts BIGINT,\n" +
                "    rowtime  AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')),\n" +
                "    WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND \n" +
                ") WITH (\n" +
                "   'connector' = 'kafka', -- 使用 kafka connector\n" +
                "    'topic' = 'dwd_page_log', -- kafka主题\n" +
                "    'scan.startup.mode' = 'earliest-offset', -- 偏移量\n" +
                "    'properties.group.id' = 'group1', -- 消费者组\n" +
                "    'properties.bootstrap.servers' = 'hadoop102:9092', \n" +
                "    'format' = 'json', -- 数据源格式为json\n" +
                "    'json.fail-on-missing-field' = 'false',\n" +
                "    'json.ignore-parse-errors' = 'true'\n" +
                ")";
        //从跳出流中进行数据导入

        String sqlDwmUserJumpDetail ="CREATE TABLE  dwm_user_jump_detail   (\n" +
                "    common ROW<ar STRING,uid STRING,os STRING,ch STRING,is_new STRING,md STRING, mid STRING, vc STRING,ba STRING>,\n" +
                "    page  ROW<page_id STRING , during_time BIGINT,last_page_id STRING>,\n" +
                "    ts BIGINT,\n" +
                "    rowtime  AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')),\n" +
                "    WATERMARK FOR rowtime AS rowtime - INTERVAL '3' SECOND \n" +
                ") WITH (\n" +
                "   'connector' = 'kafka', -- 使用 kafka connector\n" +
                "    'topic' = 'dwm_user_jump_detail', -- kafka主题\n" +
                "    'scan.startup.mode' = 'earliest-offset', -- 偏移量\n" +
                "    'properties.group.id' = 'group1', -- 消费者组\n" +
                "    'properties.bootstrap.servers' = 'hadoop102:9092', \n" +
                "    'format' = 'json', -- 数据源格式为json\n" +
                "    'json.fail-on-missing-field' = 'false',\n" +
                "    'json.ignore-parse-errors' = 'true'\n" +
                ")";
       //不同的要求去分析用户数，页面数，以及跳出率的统计。
        tableEnv.executeSql(strPageLog);
       // tableEnv.executeSql(sqlDwmUniqueVisit);
        tableEnv.executeSql(sqlDwmUserJumpDetail);
        //对数据进行统计分析后插入数据库
        //第一步对页面数进统计,对页面停留时间要做统计
        String pageTableSQL ="select \n" +
                "            DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            common.vc as vc , common.ch as ch, common.ar as ar, common.is_new  as is_new,\n" +
                "            sum(0) as uv_ct ,\n" +
                "            count(common.uid) as pv_ct ,\n" +
                "            sum(0) as sv_ct ,\n" +
                "            sum(0) as uj_ct ,\n" +
                "            sum(page.during_time) as dur_sum , UNIX_TIMESTAMP()*1000 ts " +
                "             from  dwd_page_log group by  TUMBLE(rowtime, INTERVAL '10' SECOND ),\n" +
                "             common.vc ,common.ch,common.ar,common.is_new\t";
        //用户个数，需要去重复
        String useTableSQL="select \n" +
                "            DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            common.vc as vc , common.ch as ch, common.ar as ar, common.is_new  as is_new,\n" +
                "            count( distinct common.uid)  as uv_ct ,\n" +
                "            sum(0) as pv_ct ,\n" +
                "            sum(0) as sv_ct ,\n" +
                "            sum(0) as uj_ct ,\n" +
                "            sum(0) as dur_sum , UNIX_TIMESTAMP()*1000 ts" +
                "             from  dwd_page_log group by  TUMBLE(rowtime, INTERVAL '10' SECOND ),\n" +
                "             common.vc ,common.ch,common.ar,common.is_new" ;
        //获取独立的用户个数 去重复
        // session 不为空 就是最后一次的登录页面不为空
        String sessionSql ="select \n" +
                "            DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            common.vc as vc , common.ch as ch, common.ar as ar, common.is_new  as is_new,\n" +
                "              sum(0) as uv_ct ,\n" +
                "            sum(0) as pv_ct ,\n" +
                "            count( distinct common.uid) as sv_ct ,\n" +
                "            sum(0) as uj_ct ,\n" +
                "            sum(0) as dur_sum\n" +
                "            ,UNIX_TIMESTAMP()*1000 ts  from  dwd_page_log\n" +
                "           WHERE page.last_page_id  is not null \n" +
                "   group by  TUMBLE(rowtime, INTERVAL '10' SECOND ),\n" +
                "             common.vc ,common.ch,common.ar,common.is_new" ;
        //跳出率的统计
        String jumpSQL ="select  DATE_FORMAT(TUMBLE_START(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') stt,\n" +
                "            DATE_FORMAT(TUMBLE_END(rowtime, INTERVAL '10' SECOND ),'yyyy-MM-dd HH:mm:ss') edt , \n" +
                "            common.vc as vc , common.ch as ch, common.ar as ar, common.is_new  as is_new,\n" +
                "            sum(0) as uv_ct ,\n" +
                "            sum(0) as pv_ct ,\n" +
                "            sum(0) as sv_ct ,\n" +
                "               count( distinct common.uid) as uj_ct ,\n" +
                "            sum(0) as dur_sum\n" +
                "           , UNIX_TIMESTAMP()*1000 ts  from  dwm_user_jump_detail group by  TUMBLE(rowtime, INTERVAL '10' SECOND ),\n" +
                "             common.vc ,common.ch,common.ar,common.is_new ";

        //需要将所有的数据进行联合 输出到clinckhouse
        StringBuffer sqlAll = new StringBuffer() ;
        sqlAll.append(pageTableSQL).append( " union  ").append( useTableSQL ).append(" union ").append(sessionSql)
                .append(" union ").append(jumpSQL)
                ;
        Table unionResult = tableEnv.sqlQuery(sqlAll.toString());
//        //TODO 4.将动态表转换为数据流
        DataStream<Tuple2<Boolean, VisitorStats>> visitorStatsDataStream = tableEnv.toRetractStream(unionResult, VisitorStats.class);
//        //TODO 9.向Clickhouse中插入数据
        visitorStatsDataStream.map(new MapFunction<Tuple2<Boolean,VisitorStats>, VisitorStats>() {
            @Override
            public VisitorStats map(Tuple2<Boolean, VisitorStats> booleanVisitorStatsTuple2) throws Exception {
                return  booleanVisitorStatsTuple2.f1;
            }
        }).addSink(
                ClickHouseUtil.getJdbcSink("insert into visitor_stats_2021 values(?,?,?,?,?,?,?,?,?,?,?,?)")
        );
        System.out.println(" 出入数据成功 ok!!!!!!!!!!111111111111!!!!!");
        env.execute();
    }


}
